package cn.atcoder.air.client;

import cn.atcoder.air.exception.ConnectAuthException;
import cn.atcoder.air.exception.MessageException;
import cn.atcoder.air.exception.NoProviderException;
import cn.atcoder.air.msg.*;
import cn.atcoder.air.transport.ClientTransportFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * @author yangjunda1
 * @description
 * @date 5/9/19 4:46 PM
 */
public class ClientChannelHandler extends SimpleChannelInboundHandler<BaseMessage> {

    private static final Logger LOGGER = LoggerFactory.getLogger(ClientChannelHandler.class);

    /**
     * 利用写空闲发送心跳检测消息
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case WRITER_IDLE:
                    ctx.writeAndFlush(MessageBuilder.buildHeartbeatRequest());
                    LOGGER.debug("Send heartbeat to provider:{} ...", ClientTransportFactory.refer().getRemoteAddress());

                    if (!ClientTransportFactory.refer().tryHeartbeat()) {
                        LOGGER.warn("Send heartbeat failed over 30 times, try reconnect {} provider", ClientTransportFactory.refer().getRemoteAddress());
                        ClientTransportFactory.refer().doConnect();
                    }
                    break;
                default:
                    break;
            }
        }
    }

    /**
     * 当掉线后，重新连接
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        LOGGER.debug("Retry connect to provider:{} ...", ClientTransportFactory.refer().getRemoteAddress());
        ClientTransportFactory.refer().doConnect();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, BaseMessage baseMessage) throws Exception {
        Channel channel = channelHandlerContext.channel();
        MessageType messageType = baseMessage.getMessageType();
        switch (messageType) {
            case HEARTBEAT_REQUEST_MSG:
                HeartbeatResponseMessage msg = MessageBuilder.buildHeartbeatResponse(baseMessage);
                channel.writeAndFlush(msg);
                break;
            case CALLBACK_REQUEST_MSG:
                break;
            case HEARTBEAT_RESPONSE_MSG:
                LOGGER.debug("Receive heartbeat from provider:{} ...", ClientTransportFactory.refer().getRemoteAddress());
                ClientTransportFactory.refer().shakeHandedSuccess();
                ClientTransportFactory.refer().heartbeatSuccess();
                break;
            case CALLBACK_RESPONSE_MSG:
                final CallbackResponseMessage callbackResponseMessage = (CallbackResponseMessage) baseMessage;
                if (callbackResponseMessage.isError()) {
                    throw (RuntimeException)callbackResponseMessage.getException();
                }
                ClientTransportFactory.refer().receiveResponse((ResponseMessage) baseMessage);
                break;
            default:break;
        }
        ReferenceCountUtil.release(baseMessage);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) throws Exception {
        Channel channel = ctx.channel();
        if (cause instanceof IOException) {
            LOGGER.warn("catch IOException at {} : {}",
                    channel.localAddress()+"->"+channel.remoteAddress(),
                    cause.getMessage());
        } else if (cause instanceof NoProviderException) {
            NoProviderException e = (NoProviderException) cause;
            LOGGER.error("execute method:{}.{} -> No provider",e.getClazzName(),e.getMethodName(),cause);
        } else if (cause instanceof MessageException) {
            MessageException rpcException = (MessageException) cause;
            final MessageHeader header = rpcException.getHeader();
            if (header != null && header.getMessageType() == MessageType.CALLBACK_REQUEST_MSG) {
                ResponseMessage response = new ResponseMessage();
                response.getHeader().copyHeader(header);
                response.getHeader().setMessageType(MessageType.CALLBACK_RESPONSE_MSG);
                Future future = channel.writeAndFlush(response);
                future.addListener(new FutureListener() {
                    @Override
                    public void operationComplete(Future future) throws Exception {
                        if (future.isSuccess()) {
                            LOGGER.debug("error of callback msg has been send to serverside..{}", header);
                            return;
                        } else {
                            LOGGER.error("error of callback msg to the serverSide have failed. {}", header);
                            LOGGER.error(cause.getMessage(), cause);
                        }
                    }
                });
            }
        } else {
            LOGGER.warn("catch " + cause.getClass().getName() + " at {} : {}",
                    channel.localAddress()+"->"+channel.remoteAddress(),
                    cause.getMessage());
        }
    }
}
